home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- '''Database connection support
-
- $Id: Connection.py 40785 2005-12-14 21:49:51Z tim_one $'''
- import logging
- import sys
- import tempfile
- import threading
- import warnings
- from time import time
- from persistent import PickleCache
- from persistent.interfaces import IPersistentDataManager
- from ZODB.interfaces import IConnection
- from transaction.interfaces import ISavepointDataManager
- from transaction.interfaces import IDataManagerSavepoint
- from transaction.interfaces import ISynchronizer
- from zope.interface import implements
- import transaction
- from ZODB.ConflictResolution import ResolvedSerial
- from ZODB.ExportImport import ExportImport
- from ZODB import POSException
- from ZODB.POSException import InvalidObjectReference, ConnectionStateError
- from ZODB.POSException import ConflictError, ReadConflictError
- from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
- from ZODB.utils import p64, u64, z64, oid_repr, positive_id
- global_reset_counter = 0
-
- def resetCaches():
- """Causes all connection caches to be reset as connections are reopened.
-
- Zope's refresh feature uses this. When you reload Python modules,
- instances of classes continue to use the old class definitions.
- To use the new code immediately, the refresh feature asks ZODB to
- clear caches by calling resetCaches(). When the instances are
- loaded by subsequent connections, they will use the new class
- definitions.
- """
- global global_reset_counter
- global_reset_counter += 1
-
-
- class Connection(ExportImport, object):
- '''Connection to ZODB for loading and storing objects.'''
- implements(IConnection, ISavepointDataManager, IPersistentDataManager, ISynchronizer)
- _code_timestamp = 0
-
- def __init__(self, db, version = '', cache_size = 400):
- '''Create a new Connection.'''
- self._db = db
- self._normal_storage = self._storage = db._storage
- self.new_oid = db._storage.new_oid
- self._savepoint_storage = None
- self.transaction_manager = None
- self._synch = None
- self._mvcc = None
- self._log = logging.getLogger('ZODB.Connection')
- self._debug_info = ()
- self._opened = None
- self._version = version
- self._cache = cache = PickleCache(self, cache_size)
- if version:
- self._cache.cache_drain_resistance = 100
-
- self._committed = []
- self._added = { }
- self._added_during_commit = None
- self._reset_counter = global_reset_counter
- self._load_count = 0
- self._store_count = 0
- self._creating = { }
- self._modified = []
- self._registered_objects = []
- self._needs_to_join = True
- self._inv_lock = threading.Lock()
- self._invalidated = d = { }
- self._conflicts = { }
- self._txn_time = None
- self._import = None
- self._reader = ObjectReader(self, self._cache, self._db.classFactory)
- self.connections = {
- self._db.database_name: self }
-
-
- def add(self, obj):
- """Add a new object 'obj' to the database and assign it an oid."""
- if self._opened is None:
- raise ConnectionStateError('The database connection is closed')
-
- marker = object()
- oid = getattr(obj, '_p_oid', marker)
- if oid is marker:
- raise TypeError('Only first-class persistent objects may be added to a Connection.', obj)
- elif obj._p_jar is None:
- if not obj._p_oid is None:
- raise AssertionError
- oid = obj._p_oid = self._storage.new_oid()
- obj._p_jar = self
- if self._added_during_commit is not None:
- self._added_during_commit.append(obj)
-
- self._register(obj)
- self._added[oid] = obj
- elif obj._p_jar is not self:
- raise InvalidObjectReference(obj, obj._p_jar)
-
-
-
- def get(self, oid):
- """Return the persistent object with oid 'oid'."""
- if self._opened is None:
- raise ConnectionStateError('The database connection is closed')
-
- obj = self._cache.get(oid, None)
- if obj is not None:
- return obj
-
- obj = self._added.get(oid, None)
- if obj is not None:
- return obj
-
- (p, serial) = self._storage.load(oid, self._version)
- obj = self._reader.getGhost(p)
- obj._p_oid = oid
- obj._p_jar = self
- obj._p_changed = None
- obj._p_serial = serial
- self._cache[oid] = obj
- return obj
-
-
- def cacheMinimize(self):
- '''Deactivate all unmodified objects in the cache.'''
- self._cache.minimize()
-
-
- def cacheGC(self):
- '''Reduce cache size to target size.'''
- self._cache.incrgc()
-
- __onCloseCallbacks = None
-
- def onCloseCallback(self, f):
- '''Register a callable, f, to be called by close().'''
- if self._Connection__onCloseCallbacks is None:
- self._Connection__onCloseCallbacks = []
-
- self._Connection__onCloseCallbacks.append(f)
-
-
- def close(self, primary = True):
- '''Close the Connection.'''
- if not self._needs_to_join:
- raise ConnectionStateError('Cannot close a connection joined to a transaction')
-
- if self._cache is not None:
- self._cache.incrgc()
-
- if self._Connection__onCloseCallbacks is not None:
- for f in self._Connection__onCloseCallbacks:
-
- try:
- f()
- continue
- f = getattr(f, 'im_self', f)
- self._log.error('Close callback failed for %s', f, exc_info = sys.exc_info())
- continue
-
-
- self._Connection__onCloseCallbacks = None
-
- self._debug_info = ()
- if self._synch:
- self.transaction_manager.unregisterSynch(self)
- self._synch = None
-
- if primary:
- for connection in self.connections.values():
- if connection is not self:
- connection.close(False)
- continue
-
- if self._opened is not None:
- self._db._returnToPool(self)
-
- else:
- self._opened = None
-
-
- def db(self):
- '''Returns a handle to the database this connection belongs to.'''
- return self._db
-
-
- def isReadOnly(self):
- '''Returns True if the storage for this connection is read only.'''
- if self._opened is None:
- raise ConnectionStateError('The database connection is closed')
-
- return self._storage.isReadOnly()
-
-
- def invalidate(self, tid, oids):
- """Notify the Connection that transaction 'tid' invalidated oids."""
- self._inv_lock.acquire()
-
- try:
- if self._txn_time is None:
- self._txn_time = tid
-
- self._invalidated.update(oids)
- finally:
- self._inv_lock.release()
-
-
-
- def root(self):
- '''Return the database root object.'''
- return self.get(z64)
-
-
- def getVersion(self):
- '''Returns the version this connection is attached to.'''
- if self._storage is None:
- raise ConnectionStateError('The database connection is closed')
-
- return self._version
-
-
- def get_connection(self, database_name):
- '''Return a Connection for the named database.'''
- connection = self.connections.get(database_name)
- if connection is None:
- new_con = self._db.databases[database_name].open(transaction_manager = self.transaction_manager, mvcc = self._mvcc, version = self._version, synch = self._synch)
- self.connections.update(new_con.connections)
- new_con.connections = self.connections
- connection = new_con
-
- return connection
-
-
- def _implicitlyAdding(self, oid):
- '''Are we implicitly adding an object within the current transaction
-
- This is used in a check to avoid implicitly adding an object
- to a database in a multi-database situation.
- See serialize.ObjectWriter.persistent_id.
-
- '''
- if self._creating.get(oid, 0) and self._savepoint_storage is not None:
- pass
- return self._savepoint_storage.creating.get(oid, 0)
-
-
- def sync(self):
- '''Manually update the view on the database.'''
- self.transaction_manager.abort()
- self._storage_sync()
-
-
- def getDebugInfo(self):
- '''Returns a tuple with different items for debugging the
- connection.
- '''
- return self._debug_info
-
-
- def setDebugInfo(self, *args):
- '''Add the given items to the debug information of this connection.'''
- self._debug_info = self._debug_info + args
-
-
- def getTransferCounts(self, clear = False):
- '''Returns the number of objects loaded and stored.'''
- res = (self._load_count, self._store_count)
- if clear:
- self._load_count = 0
- self._store_count = 0
-
- return res
-
-
- def abort(self, transaction):
- '''Abort a transaction and forget all changes.'''
- self._abort()
- if self._savepoint_storage is not None:
- self._abort_savepoint()
-
- self._tpc_cleanup()
-
-
- def _abort(self):
- '''Abort a transaction and forget all changes.'''
- for obj in self._registered_objects:
- oid = obj._p_oid
- if not oid is not None:
- raise AssertionError
- if oid in self._added:
- del self._added[oid]
- del obj._p_jar
- del obj._p_oid
- continue
- self._cache.invalidate(oid)
-
-
-
- def _tpc_cleanup(self):
- '''Performs cleanup operations to support tpc_finish and tpc_abort.'''
- self._conflicts.clear()
- if not self._synch:
- self._flush_invalidations()
-
- self._needs_to_join = True
- self._registered_objects = []
- self._creating.clear()
-
-
- def _flush_invalidations(self):
- self._inv_lock.acquire()
-
- try:
- invalidated = self._invalidated
- self._invalidated = { }
- self._txn_time = None
- finally:
- self._inv_lock.release()
-
- self._cache.invalidate(invalidated)
- self._cache.incrgc()
-
-
- def tpc_begin(self, transaction):
- '''Begin commit of a transaction, starting the two-phase commit.'''
- self._modified = []
- self._creating.clear()
- self._normal_storage.tpc_begin(transaction)
-
-
- def commit(self, transaction):
- '''Commit changes to an object'''
- if self._savepoint_storage is not None:
- self.savepoint()
- self._commit_savepoint(transaction)
- else:
- self._commit(transaction)
-
-
- def _commit(self, transaction):
- '''Commit changes to an object'''
- if self._import:
- self._importDuringCommit(transaction, *self._import)
- self._import = None
-
- self._added_during_commit = []
- for obj in self._registered_objects:
- oid = obj._p_oid
- if not oid:
- raise AssertionError
- if oid in self._conflicts:
- raise ReadConflictError(object = obj)
-
- if obj._p_jar is not self:
- raise InvalidObjectReference(obj, obj._p_jar)
- elif oid in self._added:
- if not obj._p_serial == z64:
- raise AssertionError
- elif obj._p_changed:
- if oid in self._invalidated:
- resolve = getattr(obj, '_p_resolveConflict', None)
- if resolve is None:
- raise ConflictError(object = obj)
-
-
- self._modified.append(oid)
-
- self._store_objects(ObjectWriter(obj), transaction)
-
- for obj in self._added_during_commit:
- self._store_objects(ObjectWriter(obj), transaction)
-
- self._added_during_commit = None
-
-
- def _store_objects(self, writer, transaction):
- for obj in writer:
- oid = obj._p_oid
- serial = getattr(obj, '_p_serial', z64)
- if serial == z64:
- implicitly_adding = self._added.pop(oid, None) is None
- self._creating[oid] = implicitly_adding
- elif oid in self._invalidated and not hasattr(obj, '_p_resolveConflict'):
- raise ConflictError(object = obj)
-
- self._modified.append(oid)
- p = writer.serialize(obj)
- s = self._storage.store(oid, serial, p, self._version, transaction)
- self._store_count += 1
-
- try:
- self._cache[oid] = obj
- except:
- self
- if hasattr(obj, 'aq_base'):
- self._cache[oid] = obj.aq_base
- else:
- raise
-
- self._handle_serial(s, oid)
-
-
-
- def _handle_serial(self, store_return, oid = None, change = 1):
- '''Handle the returns from store() and tpc_vote() calls.'''
- if not store_return:
- return None
-
- if isinstance(store_return, str):
- if not oid is not None:
- raise AssertionError
- self._handle_one_serial(oid, store_return, change)
- else:
- for oid, serial in store_return:
- self._handle_one_serial(oid, serial, change)
-
-
-
- def _handle_one_serial(self, oid, serial, change):
- if not isinstance(serial, str):
- raise serial
-
- obj = self._cache.get(oid, None)
- if obj is None:
- return None
-
- if serial == ResolvedSerial:
- del obj._p_changed
- elif change:
- obj._p_changed = 0
-
- obj._p_serial = serial
-
-
- def tpc_abort(self, transaction):
- if self._import:
- self._import = None
-
- if self._savepoint_storage is not None:
- self._abort_savepoint()
-
- self._storage.tpc_abort(transaction)
- self._cache.invalidate(self._modified)
- self._invalidate_creating()
- while self._added:
- (oid, obj) = self._added.popitem()
- del obj._p_oid
- del obj._p_jar
- self._tpc_cleanup()
-
-
- def _invalidate_creating(self, creating = None):
- '''Disown any objects newly saved in an uncommitted transaction.'''
- if creating is None:
- creating = self._creating
- self._creating = { }
-
- for oid in creating:
- o = self._cache.get(oid)
- if o is not None:
- del self._cache[oid]
- del o._p_jar
- del o._p_oid
- continue
-
-
-
- def tpc_vote(self, transaction):
- '''Verify that a data manager can commit the transaction.'''
-
- try:
- vote = self._storage.tpc_vote
- except AttributeError:
- return None
-
- s = vote(transaction)
- self._handle_serial(s)
-
-
- def tpc_finish(self, transaction):
- '''Indicate confirmation that the transaction is done.'''
-
- def callback(tid):
- d = dict.fromkeys(self._modified)
- self._db.invalidate(tid, d, self)
-
- self._storage.tpc_finish(transaction, callback)
- self._tpc_cleanup()
-
-
- def sortKey(self):
- '''Return a consistent sort key for this connection.'''
- return '%s:%s' % (self._storage.sortKey(), id(self))
-
-
- def beforeCompletion(self, txn):
- pass
-
-
- def _storage_sync(self, *ignored):
- sync = getattr(self._storage, 'sync', 0)
- if sync:
- sync()
-
- self._flush_invalidations()
-
- afterCompletion = _storage_sync
- newTransaction = _storage_sync
-
- def oldstate(self, obj, tid):
- """Return copy of 'obj' that was written by transaction 'tid'."""
- if not obj._p_jar is self:
- raise AssertionError
- p = self._storage.loadSerial(obj._p_oid, tid)
- return self._reader.getState(p)
-
-
- def setstate(self, obj):
- """Turns the ghost 'obj' into a real object by loading it's from the
- database."""
- oid = obj._p_oid
- if self._opened is None:
- msg = "Shouldn't load state for %s when the connection is closed" % oid_repr(oid)
- self._log.error(msg)
- raise ConnectionStateError(msg)
-
-
- try:
- self._setstate(obj)
- except ConflictError:
- raise
- except:
- self._log.error("Couldn't load state for %s", oid_repr(oid), exc_info = sys.exc_info())
- raise
-
-
-
- def _setstate(self, obj):
- if obj._p_oid in self._invalidated and not myhasattr(obj, '_p_independent'):
- self._load_before_or_conflict(obj)
- return None
-
- (p, serial) = self._storage.load(obj._p_oid, self._version)
- self._load_count += 1
- self._inv_lock.acquire()
-
- try:
- invalid = obj._p_oid in self._invalidated
- finally:
- self._inv_lock.release()
-
- if invalid:
- if myhasattr(obj, '_p_independent'):
- self._handle_independent(obj)
- else:
- self._load_before_or_conflict(obj)
- return None
-
- self._reader.setGhostState(obj, p)
- obj._p_serial = serial
-
-
- def _load_before_or_conflict(self, obj):
- '''Load non-current state for obj or raise ReadConflictError.'''
- if not self._mvcc and self._setstate_noncurrent(obj):
- self._register(obj)
- self._conflicts[obj._p_oid] = True
- raise ReadConflictError(object = obj)
-
-
-
- def _setstate_noncurrent(self, obj):
- '''Set state using non-current data.
-
- Return True if state was available, False if not.
- '''
-
- try:
- t = self._storage.loadBefore(obj._p_oid, self._txn_time)
- except KeyError:
- return False
-
- if t is None:
- return False
-
- (data, start, end) = t
- if not start < self._txn_time:
- raise AssertionError, (u64(start), u64(self._txn_time))
- if not end is not None:
- raise AssertionError
- if not self._txn_time <= end:
- raise AssertionError, (u64(self._txn_time), u64(end))
- self._reader.setGhostState(obj, data)
- obj._p_serial = start
- return True
-
-
- def _handle_independent(self, obj):
- if obj._p_independent():
- self._inv_lock.acquire()
-
- try:
- del self._invalidated[obj._p_oid]
- except KeyError:
- pass
- finally:
- self._inv_lock.release()
-
- else:
- self._conflicts[obj._p_oid] = 1
- self._register(obj)
- raise ReadConflictError(object = obj)
-
-
- def register(self, obj):
- '''Register obj with the current transaction manager.
-
- A subclass could override this method to customize the default
- policy of one transaction manager for each thread.
-
- obj must be an object loaded from this Connection.
- '''
- if not obj._p_jar is self:
- raise AssertionError
- if obj._p_oid is None:
- raise ValueError('assigning to _p_jar is not supported')
- elif obj._p_oid in self._added:
- return None
-
- self._register(obj)
-
-
- def _register(self, obj = None):
- if self._needs_to_join:
- self.transaction_manager.get().join(self)
- self._needs_to_join = False
-
- if obj is not None:
- self._registered_objects.append(obj)
-
-
-
- def _cache_items(self):
- items = self._cache.lru_items()
- everything = self._cache.cache_data
- for k, v in items:
- del everything[k]
-
- return everything.items() + items
-
-
- def open(self, transaction_manager = None, mvcc = True, synch = True, delegate = True):
- '''Register odb, the DB that this Connection uses.
-
- This method is called by the DB every time a Connection
- is opened. Any invalidations received while the Connection
- was closed will be processed.
-
- If the global module function resetCaches() was called, the
- cache will be cleared.
-
- Parameters:
- odb: database that owns the Connection
- mvcc: boolean indicating whether MVCC is enabled
- transaction_manager: transaction manager to use. None means
- use the default transaction manager.
- synch: boolean indicating whether Connection should
- register for afterCompletion() calls.
- '''
- self._opened = time()
- self._synch = synch
- if mvcc:
- pass
- self._mvcc = not (self._version)
- if transaction_manager is None:
- transaction_manager = transaction.manager
-
- self.transaction_manager = transaction_manager
- if self._reset_counter != global_reset_counter:
- self._resetCache()
- else:
- self._flush_invalidations()
- if synch:
- transaction_manager.registerSynch(self)
-
- if self._cache is not None:
- self._cache.incrgc()
-
- if delegate:
- for connection in self.connections.values():
- if connection is not self:
- connection.open(transaction_manager, mvcc, synch, False)
- continue
-
-
-
-
- def _resetCache(self):
- '''Creates a new cache, discarding the old one.
-
- See the docstring for the resetCaches() function.
- '''
- self._reset_counter = global_reset_counter
- self._invalidated.clear()
- cache_size = self._cache.cache_size
- self._cache = cache = PickleCache(self, cache_size)
-
-
- def __repr__(self):
- if self._version:
- ver = ' (in version %s)' % `self._version`
- else:
- ver = ''
- return '<Connection at %08x%s>' % (positive_id(self), ver)
-
- __getitem__ = get
-
- def modifiedInVersion(self, oid):
- """Returns the version the object with the given oid was modified in.
-
- If it wasn't modified in a version, the current version of this
- connection is returned.
- """
-
- try:
- return self._db.modifiedInVersion(oid)
- except KeyError:
- return self.getVersion()
-
-
-
- def exchange(self, old, new):
- oid = old._p_oid
- new._p_oid = oid
- new._p_jar = self
- new._p_changed = 1
- self._register(new)
- self._cache[oid] = new
-
-
- def savepoint(self):
- if self._savepoint_storage is None:
- self._savepoint_storage = TmpStore(self._version, self._normal_storage)
- self._storage = self._savepoint_storage
-
- self._creating.clear()
- self._commit(None)
- self._storage.creating.update(self._creating)
- self._creating.clear()
- self._registered_objects = []
- state = (self._storage.position, self._storage.index.copy())
- result = Savepoint(self, state)
- self.cacheGC()
- return result
-
-
- def _rollback(self, state):
- self._abort()
- self._registered_objects = []
- src = self._storage
- self._cache.invalidate(src.index)
- src.reset(*state)
-
-
- def _commit_savepoint(self, transaction):
- '''Commit all changes made in subtransactions and begin 2-phase commit
- '''
- src = self._savepoint_storage
- self._storage = self._normal_storage
- self._savepoint_storage = None
- self._log.debug('Commiting savepoints of size %s', src.getSize())
- oids = src.index.keys()
- self._modified.extend(oids)
- self._creating.update(src.creating)
- for oid in oids:
- (data, serial) = src.load(oid, src)
- s = self._storage.store(oid, serial, data, self._version, transaction)
- self._handle_serial(s, oid, change = False)
-
- src.close()
-
-
- def _abort_savepoint(self):
- '''Discard all subtransaction data.'''
- src = self._savepoint_storage
- self._storage = self._normal_storage
- self._savepoint_storage = None
- self._cache.invalidate(src.index)
- self._invalidate_creating(src.creating)
- src.close()
-
-
-
- class Savepoint:
- implements(IDataManagerSavepoint)
-
- def __init__(self, datamanager, state):
- self.datamanager = datamanager
- self.state = state
-
-
- def rollback(self):
- self.datamanager._rollback(self.state)
-
-
-
- class TmpStore:
- '''A storage-like thing to support savepoints.'''
-
- def __init__(self, base_version, storage):
- self._storage = storage
- for method in ('getName', 'new_oid', 'modifiedInVersion', 'getSize', 'undoLog', 'versionEmpty', 'sortKey', 'loadBefore'):
- setattr(self, method, getattr(storage, method))
-
- self._base_version = base_version
- self._file = tempfile.TemporaryFile()
- self.position = 0x0L
- self.index = { }
- self.creating = { }
-
-
- def __len__(self):
- return len(self.index)
-
-
- def close(self):
- self._file.close()
-
-
- def load(self, oid, version):
- pos = self.index.get(oid)
- if pos is None:
- return self._storage.load(oid, self._base_version)
-
- self._file.seek(pos)
- h = self._file.read(8)
- oidlen = u64(h)
- read_oid = self._file.read(oidlen)
- if read_oid != oid:
- raise POSException.StorageSystemError('Bad temporary storage')
-
- h = self._file.read(16)
- size = u64(h[8:])
- serial = h[:8]
- return (self._file.read(size), serial)
-
-
- def store(self, oid, serial, data, version, transaction):
- if not version == self._base_version:
- raise AssertionError
- self._file.seek(self.position)
- l = len(data)
- if serial is None:
- serial = z64
-
- header = p64(len(oid)) + oid + serial + p64(l)
- self._file.write(header)
- self._file.write(data)
- self.index[oid] = self.position
- self.position += l + len(header)
- return serial
-
-
- def reset(self, position, index):
- self._file.truncate(position)
- self.position = position
- self.index = index.copy()
-
-
-